上一篇進行到 token 補全的部分,相比於原本使用 significant-text aggregation,採用了詞頻的方始把 token 補完;那 suggestion 的部分是不是也要這樣做呢?
我覺得 suggestion 的部分使用 significant-text aggregation 是正確的,因為我們確實是要找到在某個範圍內『重要的文字』,所以這個部分我們保持原樣。
最後我們把前面的所有修改都加總起來成為以下的兩個 python script 和兩個 json 文件:
index_mapping.json
{
"properties": {
"name": {"type": "keyword"},
"user_id": {"type": "keyword"},
"tweet": {"type": "text", "analyzer": "index_analyzer", "fielddata": "true"},
"tweet_id": {"type": "keyword"},
"retweets": {"type": "integer"},
"favorites": {"type": "integer"},
"created": {"type": "date"},
"followers": {"type": "integer"},
"is_user_verified": {"type": "boolean"},
"geo": {"type": "geo_point"},
"location": {"type": "keyword"}
}}
index_setting.json
{
"properties": {
"name": {"type": "keyword"},
"user_id": {"type": "keyword"},
"tweet": {"type": "text", "analyzer": "index_analyzer", "fielddata": "true"},
"tweet_id": {"type": "keyword"},
"retweets": {"type": "integer"},
"favorites": {"type": "integer"},
"created": {"type": "date"},
"followers": {"type": "integer"},
"is_user_verified": {"type": "boolean"},
"geo": {"type": "geo_point"},
"location": {"type": "keyword"}
}}
index.py
import csv
import json
import elasticsearch
import itertools
from datetime import datetime
from elasticsearch import helpers
index_mapping_file = 'index_mapping.json'
index_setting_file = 'index_settings.json'
data_folder = 'covid_20200330'
data_file_name = '2020-03-31_afternoon.json'
index_name = 'covid19_tweets'
# initiate es client
es_cli = elasticsearch.Elasticsearch("http://localhost:9200")
# read index mapping file
with open(index_mapping_file, 'r') as f:
    index_mapping = json.load(f)
# read index setting file
with open(index_setting_file, 'r') as f:
    index_setting = json.load(f)
# delete index
# es_cli.indices.delete(index=index_name)
# create index
r = es_cli.indices.create(index=index_name, mappings=index_mapping, settings=index_setting)
def data_to_es(json_file: str, index_name: str):
    with open(json_file, 'r') as f:
        data = json.load(f)
    fields_to_rm = ['primary_location', 'geo_location']
    for d in data:
        for f in fields_to_rm:
            if d.get(f):
                d.pop(f)
        if d.get('geo'):
            d['geo'] = d['geo']['coordinates'].reverse()
        if d.get('created'):
            d['created'] = datetime.strptime(d['created'], "%d-%b-%Y")
        d['_index'] = index_name
        yield d
json_file = f'{data_folder}/{data_file_name}'
result = helpers.bulk(es_cli, data_to_es(json_file, index_name), raise_on_error=False)
search.py
import elasticsearch
import itertools
import re
def get_token_compelete(kw_phrase, index_name):
    tail = re.findall("\S+", kw_phrase)[-1] # 取得最後沒有打完的那個字
    query_to_compelete = {
              "query": {
                "prefix": {
                  "tweet": {
                    "value": tail
                  }
                }
              },
               "aggregations": {
                    "my_sample": {
                        "sampler": {
                            "shard_size": 1000
                        },
                        "aggregations": {
                            "find_the_whole_token": {
                                "significant_text": { 
                                    "field": "tweet",
                                    "include": f"{tail}.+",
                                    "min_doc_count": 10,
                                "size": 2
                                }
                            }
                        }
                    }
               },
              "fields": ["aggregation"],
               "_source": False 
            }
    r = es_cli.search(index=index_name, body=query_to_compelete)
    buckets = r["aggregations"]["my_sample"]["find_the_whole_token"]["buckets"]
    tokens = [bucket["key"] for bucket in buckets]
    return tokens
def get_suggestion(search_phrase, index_name):
    suggestion_query = {
        "query": {
          "match": {
            "tweet": search_phrase
          }
        },
        "aggregations": {
            "my_sample": {
                "sampler": {
                    "shard_size": 1000
                    },
                "aggregations": {
                    "tags": {
                        "significant_text": { 
                            "field": "tweet",
                            "exclude": search_phrase.split(" "),
                            "min_doc_count": 10,
                            "size": 3
                        }
                    }
                }
            }
        }
    }
    r = es_cli.search(index=index_name, body=suggestion_query)
    buckets = r["aggregations"]["my_sample"]["tags"]["buckets"]
    suggestions = [bucket["key"] for bucket in buckets]
    return suggestions
def get_auto_complete(search_phrase):
    pre_search_phrase = " ".join(re.findall("\S+", search_phrase)[0:-1])
    comelete_tokens = get_token_compelete(search_phrase, index_name)
    guess = []
    for token in comelete_tokens:
        compelete_search_prahse = " ".join([pre_search_phrase, token])
        suggestions = get_suggestion(compelete_search_prahse, index_name)
        if not suggestions:
            guess.append(token)
        for suggestion in suggestions:
            guess.append(" ".join([compelete_search_prahse, suggestion]).strip())
    return guess
es_cli = elasticsearch.Elasticsearch("http://localhost:9200")
index_name = 'covid19_tweets'
search_phrase = "cov"
print(get_auto_complete(search_phrase))
綜整一下這幾篇以來進行的主要優化:
雖然作為搜尋,應該要有個介面來展示才對,但我是前端苦手,如果你覺得這個做法非常適合你的應用,歡迎把他拿去做為搜尋的後端,幫我實現最後一步 XD
截至目前為止大概是回應了一開始寫這系列文章的主要目的:紀錄之前使用 ES 實作 auto-complete 的內容。
如同一開始所說,我發現 ES 近年有很多新的功能出現,非常目不暇給,所以我希望把往後的篇幅拿來更新 ES 的其中一項新功能:作為 vector db 使用。
下一篇會從 vector db 的簡介開始。